package cn.xshi.sentinel.log;

import cn.xshi.sentinel.log.task.MetricLogTask;
import com.alibaba.csp.sentinel.config.SentinelConfig;
import com.alibaba.csp.sentinel.node.metric.MetricNode;
import com.alibaba.csp.sentinel.node.metric.MetricSearcher;
import com.alibaba.csp.sentinel.node.metric.MetricWriter;
import com.alibaba.csp.sentinel.util.PidUtil;
import cn.xshi.sentinel.Constants;
import cn.xshi.sentinel.util.FlowRuleUtil;
import cn.xshi.sentinel.util.SpringApplicationUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.concurrent.ForkJoinPool;

@Component
@Slf4j
public class MetricLog{
    private static boolean isRun = true;
    private static Long updateTime = 0L;
    private volatile MetricSearcher searcher;
    private final Object lock = new Object();
    private static final int times = 60*1000;//60秒同步一次

    /**
     * 查询记录并同步
     */
    public void execute() {
        new Thread(new MetricLogRunnable()).start();
    }

    /**
     *
     */
    class MetricLogRunnable implements Runnable {
        public void run() {
            while (isRun){
                try {
                    long endTime = System.currentTimeMillis();
                    if(null == updateTime || updateTime == 0L){
                        updateTime =  System.currentTimeMillis()-times;//推迟1分钟，作为开始时间
                    }
                    findMetricLog(updateTime,endTime,null);
                    updateTime = endTime;//修改最后更新时间
                    Thread.sleep(times);
                }catch (Exception e){
                    log.error("查询记录并同步异常：{}",e);
                }
            }
        }
    }

    /**
     * 查找数据
     * @param beginTimeMs
     * @param endTimeMs
     * @param identity
     */
    public void findMetricLog(long beginTimeMs, long endTimeMs, String identity){
        FlowRuleUtil flowRuleUtil = SpringApplicationUtils.getBean(FlowRuleUtil.class);
        if(StringUtils.isBlank(flowRuleUtil.getSentinelKafkaAddress())){
            log.warn("无法获取到KafkaServer参数配置，不能进行日志查询操作！");
            return;
        }
        try {
            if (searcher == null) {
                synchronized (lock) {
                    //获取应用名
                    String appName = SentinelConfig.getAppName();
                    if (appName == null) {
                        appName = "";
                    }
                    if (searcher == null) {
                        //用来找metric文件，
                        searcher = new MetricSearcher(MetricWriter.METRIC_BASE_DIR,
                                MetricWriter.formMetricFileName(appName, PidUtil.getPid()));
                    }
                }
            }
            List<MetricNode> metricNodes = searcher.findByTimeAndResource(beginTimeMs,endTimeMs,identity);
            sync(SentinelConfig.getAppName(),metricNodes);
        }catch (Exception e){
            log.error("查询日志异常：{}",e);
        }
    }

    /**
     * 同步数据
     * @param metricNodes
     */
    public void sync(String appName,List<MetricNode> metricNodes){
        initForkJoinPool(appName,metricNodes);
    }

    /**
     *
     */
    public void initForkJoinPool(String appName,List<MetricNode> metricNodes){
        int size = Runtime.getRuntime().availableProcessors();//获取本系统的有效线程数，设置线程池为有效线程的两倍。
        ForkJoinPool forkJoinPool = new ForkJoinPool(size*2);
        try {
            if(CollectionUtils.isEmpty(metricNodes)){
                return;
            }
            MetricLogTask initTask = new MetricLogTask(appName,metricNodes, Constants.METRIC_GROUP_NUMBER);
            //方法一 同步
            Integer result = forkJoinPool.invoke(initTask);
            log.info("推送日志数量："+result+" 条");
        }catch (Exception e){
            if(null != forkJoinPool){
                forkJoinPool.shutdown();
            }
        }finally {
            if(null != forkJoinPool){
                forkJoinPool.shutdown();
            }
        }
    }

    /**
     *  销毁
     */
    public void destroy(){
        isRun = false;
    }
}
